package com.flink.java.demo.batch;

/**
 * 从flink 1.12开始不建议使用DataSet API了，批处理和流处理统一使用DataStream Api
 *
 */

import com.flink.java.demo.FlinkConstant;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.concurrent.TimeUnit;

public class BatchWordCount {

    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration configuration = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        // 2. 从文件读取数据  按行读取(存储的元素就是每行的文本)
//        URL resource = BatchWordCount.class.getClassLoader().getResource("words.txt");
//        System.out.println(resource);

        // 从文件读取数据
        FileSource<String> source = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path(FlinkConstant.wordPath)).build();
        DataStreamSource<String> lineDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "myFileSource");

        // 3. 转换数据格式： flatMap 一进多出， 一行数据(hello flink)变成多个( (hello,1), (flink,1)   )
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = lineDS.
                flatMap((String line, Collector<Tuple2<String, Integer>> tuple2Collector) -> {
                    //按照空格分割
                    String[] words = line.split(" ");

                    for (String word : words) {
                        //将每个单词转换成二元组 (world, 1)
                        Tuple2<String, Integer> tuple2 = Tuple2.of(word, 1);
                        //使用collector向下游发送数据
                        tuple2Collector.collect(tuple2);
                    }
                })
                //当Lambda表达式使用 Java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息
                .returns(Types.TUPLE(Types.STRING, Types.INT));

        // 4. 按照 word 进行分组： 0位置，表示第0个元素
        // TODO 3.2 分组
        KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOneDS.keyBy(key->key.f0);

        // TODO 3.3 聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumDS = wordAndOneKS.sum(1);

        // 6. 打印结果
        sumDS.print();

        env.execute();

        TimeUnit.MINUTES.sleep(300);
    }
}

